feat: data plane transfer queue integration#2439
Conversation
bff0471 to
d20a6ed
Compare
|
/ok to test d20a6ed |
d20a6ed to
e7f6a91
Compare
|
/ok to test e7f6a91 |
|
/ok to test f8add06 |
|
/ok to test c7cb642 |
c7cb642 to
fa121a5
Compare
|
/ok to test fa121a5 |
fa121a5 to
8de60a8
Compare
|
/ok to test 8de60a8 |
8de60a8 to
aeb273c
Compare
|
/ok to test aeb273c |
aeb273c to
1596562
Compare
|
/ok to test 1596562 |
1596562 to
abada7e
Compare
|
/ok to test abada7e |
nemo_rl was wrapping ``np.ndarray(dtype=object)`` columns as ``NonTensorStack(*v.tolist())`` before storing them as leaves in the ``TensorDict`` passed to ``dp_client.kv_batch_put``. Under ``tensordict==0.12.2``, ``bulk[k]`` on such a leaf returns an internal ``LinkedList`` — the ``NonTensorStack`` class identity is lost, and calling ``.contiguous()`` on the parent ``TensorDict`` collapses the leaf to an empty ``TensorDict``, dropping the wrapped Python objects entirely. Symptom: simple-backend GRPO recipes crash at the first ``kv_batch_get`` for ``content`` with ``RuntimeError: All tensordicts must be non-tensors`` inside ``_pack_field_values``, because every batch position is an empty ``TensorDict`` instead of the expected per-sample string. Fix: - ``nemo_rl/experience/sync_rollout_actor.py``: pass object arrays through as ``np.ndarray`` (canonical site, full rationale). - ``nemo_rl/data_plane/column_io.py``: same on the ``write_columns`` path; refers to ``kv_first_write``. - ``nemo_rl/data_plane/adapters/transfer_queue.py``: drop the ``.contiguous()`` call — TQ's encoder forces ``.contiguous()`` per tensor leaf itself, and on a parent TD with non-tensor leaves the call is destructive. ``TensorDict`` preserves ``ndarray(dtype=object)`` identity through ``__getitem__``, and TQ's encoder serializes object arrays via ``CUSTOM_TYPE_PICKLE``. No TQ patch required. As a bonus, the new path skips the ``.tolist()`` materialization that the old wrapper performed per write. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
…cate writes Under TP-only configs (e.g. TP=2 CP=1), every rank in the TP group was calling ``TQWorkerMixin._write_back`` and racing to write the same per-sample keys (``prev_logprobs`` etc.). On the simple backend the second writer's bytes silently overwrote the first (``last-write-wins`` on a Python dict — benign because the data is identical post-all-reduce). On the mooncake_cpu backend the ``MooncakeStore`` master rejected the second writer's ``BatchPutEnd`` with ``ILLEGAL_CLIENT`` (-601) because the metadata ``client_id`` was set to the first writer's UUID — the recipe crashed at the first ``kv_batch_put`` of the offending step. The existing leader check ``_is_replica_leader`` correctly returns False for non-leaders, but only when ``_get_replica_group`` returns a non-None group. Subclasses gate ``_get_replica_group`` on ``CP > 1`` as a fetch-path optimization (the docstring explicitly calls out "matches the qwen3-mcore TP=2 baseline"). That gate incorrectly disables the leader check on the write-back path too: ``CP=1 ⇒ replica_group is None ⇒ _is_replica_leader → True for every rank``. Split the write-back leader check from the replica-group machinery: - ``TQWorkerMixin._is_writeback_leader``: default delegates to ``_is_replica_leader`` (preserves behavior for workers with no parallelism). - ``MegatronPolicyWorkerImpl._is_writeback_leader``: override gates on ``(tp_rank, cp_rank, pp_rank) == (0, 0, 0)`` via mcore ``parallel_state`` — unconditional, no CP gate. - ``DTensorPolicyWorkerV2Impl._is_writeback_leader``: same idea but using ``device_mesh["cp"].get_local_rank()`` / ``device_mesh["tp"].get_local_rank()``. - ``_write_back`` switched from ``_is_replica_leader`` to ``_is_writeback_leader``. Correctness: simple backend's ``last-write-wins`` already proves the data is identical across TP siblings (DSv3 32n8g TP=32 simple passes its closing ``check_metrics.py`` with the same multi-write pattern; gating to leader-only is semantically equivalent). Mooncake's race is eliminated because exactly one client now writes each key. Perf: ``tp_size - 1`` redundant ``kv_batch_put`` calls per training step are now skipped on every backend, not just mooncake_cpu. Verified by JOBID 11758259 (1n8g megatron TP=2 + temp/top-p/top-k sampling on mooncake_cpu) — past Step 11/500 with no -601, whereas every prior attempt of this recipe crashed at Step 1 within ~5 min. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
|
/ok to test 3ee70c4 |
Pre-commit auto-fix changes flagged by CI: - nemo_rl/data_plane/column_io.py: remove unused NonTensorStack import (only referenced in docstrings/comments now). - nemo_rl/experience/sync_rollout_actor.py: drop blank line between docstring close and first statement. Plus two D205 (blank-line-between-summary-and-description) fixes that ruff check flagged after the auto-fixes: - nemo_rl/models/policy/workers/dtensor_policy_worker_v2.py:216 - nemo_rl/models/policy/workers/megatron_policy_worker.py:117 Both _is_writeback_leader docstrings had a summary line ending with 'See' and the description continuing on the next line without a blank separator. Verified: ruff check + ruff format --check both pass on nemo_rl/, tests/, examples/, docker/, docs/. Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
|
/ok to test 209d266 |
yuki-97
left a comment
There was a problem hiding this comment.
@ZhiyuLi-Nvidia thanks for the great work! I go through grpo_sync.py and left some comments.
| "timestamp": time.time(), | ||
| } | ||
|
|
||
| # When the data plane is enabled, replace the in-memory dict |
There was a problem hiding this comment.
just curious, does the changes in this file (nemo_rl/algorithms/async_utils/trajectory_collector.py) an example for how to using TQ?
since I think this file will only be used in async grpo and this PR won't actually use it?
There was a problem hiding this comment.
Correct — this part is async-grpo for the incoming grpo_async intergration and not yet in part of this PR. I'll drop it to keep the scope sync-only from this PR and we can re-introduce it later together with async refactoring.
There was a problem hiding this comment.
Reverted the change in nemo_rl/algorithms/async_utils/trajectory_collector.py.
| from nemo_rl.models.generation.interfaces import GenerationInterface | ||
|
|
||
|
|
||
| def kv_first_write( |
There was a problem hiding this comment.
can we move it as a function of DataPlaneClient?
There was a problem hiding this comment.
Thank you @yuki-97.
kv_first_write is driver aware in original design and DataPlaneClient isn't the right place. To address your comments, I refactored this part for better the rollout-shape decoupling so what's left is small enough that the question dissolves.
Now, kv_first_write moved to column_io.py sync_rollout_actor.py −95 LOC, client stays transport-only.
| step_finished=True, | ||
| ) | ||
|
|
||
| batch_cache = None |
There was a problem hiding this comment.
Good catch. Removed this unused variable.
| self._dp_client = build_data_plane_client(dp_cfg, bootstrap=True) | ||
| self._tq_partition_id = tq_partition_id |
There was a problem hiding this comment.
looks the two are used in many places outside the class in grpo_sync.py, wdyt making them public?
There was a problem hiding this comment.
Good comments.
Fixed.
| # no need to refetch the bulk schema. Logprob / | ||
| # mask / adv columns added later are irrelevant | ||
| # here. | ||
| _calib_fields = [ |
There was a problem hiding this comment.
shall we just choose what we need here instead of not in?
There was a problem hiding this comment.
Thank you for the comments.
A pure positive list isn't feasible as multimodal field names are model-specific (pixel_values, image_grid_thw, audio extras, …) and there's no static.
Here's the fix to address your concern: create the exclude set into a named DP_CALIB_EXCLUDED_FIELDS in data_plane/schema.py.
| partition_id=partition_id, | ||
| task_name=None, | ||
| keys=list(keys), | ||
| fields=field_names, |
There was a problem hiding this comment.
seems meta.fields only contains the filed put this time instead of all fields. is it what we want?
There was a problem hiding this comment.
Yes, thats intentional. The point is to minimize transfer: each consumer (logprob, training, calibration) pulls only its needed columns via select_fields. meta.fields lists what's available; the wire only carries what's selected.
| ) | ||
|
|
||
| if self.policy_generation is not None: | ||
| self.policy_generation.finish_generation() |
There was a problem hiding this comment.
just curious, why we need policy_generation.finish_generation() here?
There was a problem hiding this comment.
It is the exact same policy_generation.finish_generation() performing the same GPU cleanup on the same worker. The only change is who triggers it and when:
- Old way (legacy grpo.py): The driver called it as a separate step after the rollout finished.
- New way (grpo_sync.py): The rollout actor calls it automatically as part of wrapping up the rollout — bundled into the same Ray round-trip as the rollout itself, so the driver gets back everything it needs (metadata, metrics) in one call instead of three.
| gen_metrics = None | ||
| return meta, slice_extras, rollout_metrics, gen_metrics | ||
|
|
||
| def finish_generation(self) -> None: |
There was a problem hiding this comment.
seems grpo_sync.py still just use policy_generation.finish_generation() and I think good to just use that.
so do we still need this function in the class? seems it not used anywhere.
also for get_logger_metrics and clear_logger_metrics.
There was a problem hiding this comment.
Yeap. Good catch removed all these unused apis.
There was a problem hiding this comment.
The three wrapper methods were never called by anyone outside. The actual work resetting accumulators before rollout, capturing metrics after, releasing GPU resources still happens correctly through direct policy_generation calls inside rollout_and_first_put, and gen_metrics is returned to the driver in the result tuple. Removed the unused wrappers; behavior unchanged.
Modify to make it more transparent.
The async-grpo data-plane plumbing in AsyncTrajectoryCollector (dp_cfg constructor param, _ensure_dp_client(), and the kv_batch_put branch in _run_prompt_group_worker) isn't exercised in the sync data-plane PR. Re-introduce it together with the async-grpo refactor. Per yuki-97 PR review (#1). Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
DataPlaneClient stays a pure transport. Rollout-shape concerns (key layout, batch geometry) move to the producer. * New codec.pack_jagged_fields() owns the single wire-layout transform (jagged pack + np.ndarray(dtype=object) passthrough). Previously duplicated across kv_first_write, write_columns, and worker_mixin._write_back. * kv_first_write moves to data_plane/column_io.py next to write_columns and takes pre-minted keys (no longer (uids, n_gen)). Rollout actor builds the keys inline at the call site, matching how verl's AgentLoopWorkerTQ._agent_loop_postprocess draws the same line. * worker_mixin._write_back collapses to write_columns(...). * Unit tests updated for the new keys= signature. Net: sync_rollout_actor.py -95 LOC; client signature unchanged. Per yuki-97 PR review (#2). Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
* Remove test_materialize_rejects_non_tensor_leaves: codec.materialize
now deliberately accepts NonTensorData (commit c3ac42342, simple-
backend wire-strip survival). The "wire is tensors only" assertion
predated that fix and contradicts current behavior.
* Remove test_no_data_plane_in_master_config: TODO gate that fires
until legacy grpo.py is retired; tracked separately.
* Rewrite test_codec_wire_stripped.py to cover the production decode
paths via:
- direct unit coverage of unwrap_wire_stripped_payload (per-item
helper; doesn't need a NonTensorStack construction);
- end-to-end materialize coverage via patch.object(stack, "tolist")
to simulate the wire-stripped state (tensordict>=0.12.2 rejects
NonTensorStack(TensorDict({}, batch_size=[])) at construction).
Wire round-trip e2e for object columns is already covered by
tests/data_plane/functional/test_tq_lifecycle.py.
Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
* grpo_sync.py: remove unused batch_cache = None (leftover from grpo.py-style dynamic sampling; grpo_sync threads survivors through pending_meta / pending_slice). * TQPolicy: rename _dp_client -> dp_client and _tq_partition_id -> tq_partition_id. They are read from grpo_sync.py in 7 places, so the underscore prefix was misleading. Constructor kwarg tq_partition_id already matched the new attribute name. * Update README + data_plane_api_lifecycle docs example snippets. Per yuki-97 PR review (#3, #4). Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
…ema constant
Replaces the inline 6-field exclusion tuple in grpo_sync.py with
DP_CALIB_EXCLUDED_FIELDS in data_plane/schema.py, derived from
DP_TRAIN_FIELDS - {input_ids, input_lengths}. A new column added to
DP_TRAIN_FIELDS is now excluded-by-default from KV-scale calibration;
opting it in requires editing the private base set explicitly.
Multimodal extras (pixel_values, image_grid_thw, etc.) pass through
unchanged because they are not in DP_TRAIN_FIELDS.
Per yuki-97 PR review (#5).
Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
Silent over-fetch was possible when callers omitted select_fields: the noop adapter fetched every registered field via set intersection; the TQ adapter forwarded None to the backend. Bulk schemas are wide and fetching everything is the most expensive shape the wire can take. select_fields is now a required list[str] on DataPlaneClient.kv_batch_get and all concrete implementations. Callers must name what they read; fetch-all is still possible by passing list(meta.fields) explicitly. Also: worker_mixin internal call sites use list(meta.fields) directly (fail-loud TypeError if meta.fields is None, rather than silently producing an empty TensorDict). Per yuki-97 PR review (#6). Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
…ifecycle
Remove three actor-level wrappers (finish_generation, get_logger_metrics,
clear_logger_metrics) that had zero external callers. The actor's internal
code already calls self.policy_generation.{...} directly at the right
points inside rollout_to_tq; the wrappers added indirection without value.
Rewrite the rollout_to_tq docstring to list all six steps bundled into
the single Ray RPC (reset metrics -> rollout -> flatten -> TQ put ->
release GPU -> capture metrics), making the lifecycle visible without
having to read the method body.
Per yuki-97 PR review (#7, #8).
Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
|
/ok to test eecbcc4 |
… discovery
tests/unit/L0_Unit_Tests_*.sh hard-code TEST_PATHS=('unit/'), so any test outside tests/unit/ is silently skipped. Our data_plane suite lived at tests/data_plane/unit/ and was never collected by CI.
Move all 19 unit tests + conftest + __init__ into tests/unit/data_plane/ via git mv (one was untracked, so plain mv). The tests/data_plane/functional/ tree stays where it is — those are Tier 2 (Ray + TQ), need a separate runner.
Plus three drive-by fixes flagged by CI lint:
- nemo_rl/data_plane/docs/data_plane_api_lifecycle.md → data-plane-api-lifecycle.md (new pre-commit hook disallows underscores in .md filenames).
- nemo_rl/data_plane/column_io.py:158: change variable annotation Mapping -> dict so pyrefly accepts the dict-comp result at the pack_jagged_fields call site (the function signature is dict[str, ...]). Mapping import drops as unused; ruff auto-fixes.
- nemo_rl/data_plane/worker_mixin.py:224,249: pyrefly no-matching-overload on list(meta.fields) where meta.fields: list[str] | None. Use # type: ignore[no-matching-overload] rather than list(meta.fields or []) — the runtime contract guarantees meta.fields is non-None at these call sites; silently substituting [] would mean fetch-nothing which is wrong.
- nemo_rl/experience/sync_rollout_actor.py: add 'import torch' (F821 — module used torch.zeros_like and isinstance(v, torch.Tensor) without importing torch).
Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
|
/ok to test 25dcf41 |
… tests After the move under tests/unit/data_plane/ (commit 25dcf41), the CI ruff hooks would auto-fix two minor things: - test_codec_wire_stripped.py: drop extra blank line after import block - test_correctness.py: merge two 'from column_io import ...' lines (import-sort) Apply locally so the pre-commit auto-fix step on CI has nothing left to modify (avoiding the 'files were modified by this hook' failure on the next run). Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
|
/ok to test fff691d |
The link in nemo-gym-integration.md pointed at docs.nvidia.com/nemo/gym/latest/about/concepts/core-components.html which returns 404. The page is actually served at docs.nvidia.com/nemo/gym/about/core-components (no /latest/, no /concepts/) per the gym docs sitemap. Update the source link instead of suppressing it via the linkcheck false-positives walkaround. Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
|
/ok to test c069f92 |
| """ | ||
|
|
||
| @abstractmethod | ||
| def check_consumption_status( |
There was a problem hiding this comment.
i can't seem to find any uses of this function outside of tests. is this needed?
| """ | ||
|
|
||
| @abstractmethod | ||
| def claim_meta( |
There was a problem hiding this comment.
same with this one. i can't seem to find it used anywhere outside of tests. intentional?
| """ | ||
|
|
||
| @abstractmethod | ||
| def get_data( |
There was a problem hiding this comment.
i can't seem to find any uses of this function outside of tests. is this needed?
| policy_generation = policy # type: ignore | ||
| NEED_REFIT = False | ||
| POLICY_GENERATION_STALE = True | ||
| assert policy_generation is not None # for mypy type check |
| adv_estimator = _create_advantage_estimator(master_config) | ||
|
|
||
| # ── Data-plane setup (mandatory in the sync trainer) ─────────────── | ||
| # Sync trainer requires a TQ-mediated policy. The TQPolicy ctor |
There was a problem hiding this comment.
| # Sync trainer requires a TQ-mediated policy. The TQPolicy ctor | |
| # Sync trainer requires a TQ-mediated policy. The TQPolicy actor |
| } | ||
| if not hasattr(policy, "dp_cfg"): | ||
| raise ValueError( | ||
| "grpo_train_sync requires a TQ-mediated policy " |
There was a problem hiding this comment.
we should probably maintain a set of issues to clean this stuff up. i can imagine things like this will be forgotten unless we track somehow. Maybe github issue for each small thing is too tedious, so maybe just one "cleanup" tracking issue for things like this
| # ────────────────────────────────────────────────────────────────────────── | ||
|
|
||
|
|
||
| def _get_local_node_ip() -> str: |
There was a problem hiding this comment.
This only rejects link-local (169.254/16) but not loopback (127.0.0.1). On hosts where gethostname() resolves to 127.0.0.1 via /etc/hosts, announcing that to mooncake peers as MC_TCP_BIND_ADDRESS would cause cross-node connection refused. The PR's own test notes "Loopback is NOT skipped by the current implementation."
Consider also rejecting is_loopback.
| """Inject ``{"pip": ["TransferQueue==0.1.6"]}`` into TQ's actor ``.options()``. | ||
|
|
||
| TQ spawns ``SimpleStorageUnit`` and ``TransferQueueController`` via | ||
| ``Cls.options(...).remote(...)`` without a runtime_env, so they | ||
| inherit the job-level env. In a multi-node container deployment | ||
| where each node has its own ``/opt/nemo_rl_venv``, the driver's | ||
| ``uv sync`` only updates ray-head's venv and a worker-node actor | ||
| fails with ``ModuleNotFoundError``. This monkey-patch makes Ray | ||
| pip-install TQ into a per-actor runtime_env on first spawn (cached | ||
| per-node by Ray afterwards). Idempotent. Couples us to TQ's internal | ||
| class layout — if TQ restructures, this becomes a no-op with a | ||
| logged warning and we fall back to per-node ``uv sync``. | ||
| """ | ||
| global _TQ_RUNTIME_ENV_PATCHED | ||
| if _TQ_RUNTIME_ENV_PATCHED: | ||
| return | ||
|
|
||
| runtime_env = {"pip": ["TransferQueue==0.1.6"]} |
There was a problem hiding this comment.
not quite sure why this is pinned when the pyproject.toml uses a diff version. seems brittle
| tq = _tq() | ||
| base = OmegaConf.load(str(resources.files("transfer_queue") / "config.yaml")) | ||
|
|
||
| backend = cfg.get("backend", "simple") |
There was a problem hiding this comment.
Per project config conventions, YAML is the single source of truth for defaults. These lines introduce hidden defaults for backend ("simple"), storage_capacity (1_000_000), and num_storage_units (2). Same pattern at lines 262, 266, and 410 (global_segment_size, local_buffer_size, get_meta_poll_interval_s).
There is no data_plane: block in any exemplar YAML under examples/configs/, so the only source of these defaults is Python code — exactly what the convention exists to prevent.
Consider adding a data_plane: block to the relevant exemplar YAMLs with these defaults expressed there, and replacing the in-code defaults with .get(key) / .get(key, None).
| # ── (A) task-mediated ─────────────────────────────────────────────── | ||
|
|
||
| @abstractmethod | ||
| def register_partition( |
There was a problem hiding this comment.
is there any case where this doesn't mean "regster step"? i feel the naming may be more approachable if we can map it to a training loop's sematics
What does this PR do ?
Summary
driver only handles per-sample slices and KVBatchMeta.
Scope
(read_columns / write_columns).
rollout 1-hop put → meta-driven logprob/train → kv_clear.
Test
https://wandb.ai/nvidia/nemorl-dataplane-zhiyul?nw=nwuserzhiyul
Usage
# Add a code snippet demonstrating how to use thisBefore your PR is "Ready for review"
Pre checks:
Additional Information